{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# HTML Render"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%html\n",
    "<!-- Silencing error messages in the notebook -->\n",
    "<style>\n",
    "div.output_stderr {\n",
    "background: #ffdd;\n",
    "display: none;\n",
    "}\n",
    "</style>"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%html\n",
    "<!-- Making the cells take up the full width of the window -->\n",
    "<style>\n",
    ".container { width:100% !important; }\n",
    "</style>"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%html\n",
    "<!-- Changing the font of code cells -->\n",
    "<style>\n",
    ".CodeMirror{font-family: \"Courier New\";font-size: 12pt;}\n",
    "</style>"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%html\n",
    "<!-- Changing the size of tables to 20px -->\n",
    "<style>\n",
    ".rendered_html table, .rendered_html td, .rendered_html th {font-size: 20px;}\n",
    "</style>"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# System Settings"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import os\n",
    "from pathlib import Path\n",
    "home = os.path.realpath(str(Path.home()))\n",
    "cwd = os.getcwd()\n",
    "print(f'home: {home}')\n",
    "print(f'cwd: {cwd}')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import os\n",
    "import pandas as pd\n",
    "\n",
    "pd.set_option('display.max_rows', None)\n",
    "\n",
    "# Convert the os.environ object to a dictionary and then to a DataFrame\n",
    "env_df = pd.DataFrame(list(dict(os.environ).items()), columns=['Environment Variable', 'Value'])\n",
    "\n",
    "# Display the DataFrame\n",
    "from IPython.display import display\n",
    "\n",
    "display(env_df)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import socket\n",
    "localhost=socket.gethostname()\n",
    "local_ip=socket.gethostbyname(localhost)\n",
    "\n",
    "print(f'localhost: {localhost}')\n",
    "print(f'ip: {local_ip}')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "spark_version=!head  -n1 $SPARK_HOME/RELEASE | awk '{print $2}'\n",
    "spark_version = spark_version[0]\n",
    "\n",
    "print(f\"Spark version from SPARK_HOME: {spark_version}\")\n",
    "spark_version_short=''.join(spark_version.split('.'))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import logging\n",
    "import sys\n",
    "\n",
    "logging.basicConfig(format='%(levelname)s : %(message)s', level=logging.ERROR, stream=sys.stdout)\n",
    "logger = logging.getLogger()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import os\n",
    "\n",
    "hdfs_event_dir=''\n",
    "local_event_dir=''\n",
    "\n",
    "def get_spark_eventlog_dir(path):\n",
    "    eventlog_dir = None\n",
    "    eventlog_enabled = False\n",
    "    try:\n",
    "        with open(path, 'r') as f:\n",
    "            for line in f:\n",
    "                if line.startswith('spark.eventLog.dir'):\n",
    "                    eventlog_dir = line.split(' ')[-1].strip()\n",
    "                elif line.startswith('spark.eventLog.enabled'):\n",
    "                    eventlog_enabled = line.split(' ')[-1].strip().lower() == 'true'\n",
    "    except FileNotFoundError:\n",
    "        raise SystemExit(f\"'spark-defaults.conf' not found: {path}\")\n",
    "    if not eventlog_enabled:\n",
    "        raise SystemExit(\"'spark.eventLog.enabled' must be enabled.\")\n",
    "    return eventlog_dir\n",
    "\n",
    "spark_defaults_conf = None\n",
    "\n",
    "if 'SPARK_CONF_DIR' in os.environ:\n",
    "    spark_defaults_conf = os.path.join(os.environ['SPARK_CONF_DIR'], 'spark-defaults.conf')\n",
    "elif 'SPARK_HOME' in os.environ:\n",
    "    spark_defaults_conf = os.path.join(os.environ['SPARK_HOME'], 'conf', 'spark-defaults.conf')\n",
    "\n",
    "if spark_defaults_conf:\n",
    "    event_log_dir = get_spark_eventlog_dir(spark_defaults_conf)\n",
    "    if event_log_dir:\n",
    "        print(f\"spark.eventLog.dir: {event_log_dir}\")\n",
    "        if event_log_dir[:7] == 'hdfs://':\n",
    "            hdfs_event_dir = event_log_dir\n",
    "        elif event_log_dir[:6] == 'file:/':\n",
    "            local_event_dir = event_log_dir[6:]\n",
    "    else:\n",
    "        raise SystemExit(f\"'spark.eventLog.dir' is not configured in {spark_defaults_conf}\")\n",
    "else:\n",
    "    raise SystemExit(\"Cannot get `spark.eventLog.dir`. Neither SPARK_CONF_DIR nor SPARK_HOME defined in envrionment variables.\")\n",
    "    "
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Monitor"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import findspark\n",
    "import os\n",
    "\n",
    "findspark.init(os.environ['SPARK_HOME'])\n",
    "os.environ.setdefault('SPARK_SUBMIT_OPTS', '-Dscala.usejavacp=true')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import warnings\n",
    "warnings.filterwarnings('ignore')\n",
    "\n",
    "import atexit\n",
    "import collections\n",
    "import gzip\n",
    "import importlib\n",
    "import json\n",
    "import logging\n",
    "import math\n",
    "import os\n",
    "import pathlib\n",
    "import shutil\n",
    "import signal\n",
    "import subprocess\n",
    "import tempfile\n",
    "import threading\n",
    "import time\n",
    "import timeit\n",
    "import traceback\n",
    "\n",
    "import matplotlib\n",
    "import matplotlib.colors as colors\n",
    "import matplotlib.pyplot as plt\n",
    "import matplotlib.ticker as mtick\n",
    "import numpy as np\n",
    "import pandas as pd\n",
    "import platform\n",
    "import pyspark\n",
    "import pyspark.sql.functions as F\n",
    "import pyspark.sql.types as T\n",
    "import spylon_kernel\n",
    "from collections import namedtuple\n",
    "from concurrent.futures import ThreadPoolExecutor\n",
    "from datetime import date, datetime\n",
    "from functools import reduce\n",
    "from IPython.display import display, HTML\n",
    "from matplotlib import rcParams\n",
    "from pyspark import SparkConf, SparkContext\n",
    "from pyspark.ml import Pipeline\n",
    "from pyspark.ml.feature import StringIndexer, VectorAssembler\n",
    "from pyspark.sql import SparkSession, SQLContext, Window\n",
    "from pyspark.sql.functions import col, floor, lit, rank, to_date\n",
    "from pyspark.sql.types import (DoubleType, FloatType, IntegerType,\n",
    "                               StringType, StructField, StructType,\n",
    "                               TimestampType)\n",
    "\n",
    "from spylon_kernel import register_ipython_magics\n",
    "from spylon.spark.utils import SparkJVMHelpers\n",
    "\n",
    "register_ipython_magics()\n",
    "\n",
    "rcParams['font.sans-serif'] = 'Courier New'\n",
    "rcParams['font.family'] = 'Courier New'\n",
    "rcParams['font.size'] = '12'\n",
    "\n",
    "%matplotlib inline\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "code_folding": []
   },
   "outputs": [],
   "source": [
    "import socket\n",
    "import os\n",
    "import sys\n",
    "import json\n",
    "\n",
    "def upload_profile(server, base_dir, appid):\n",
    "    local_profile_dir = os.path.join(home, 'profile')\n",
    "    !mkdir -p {local_profile_dir}\n",
    "    !(cd {local_profile_dir}; rm -f {appid}.tar.gz; tar zcvf {appid}.tar.gz {appid}) >/dev/null 2>&1\n",
    "    \n",
    "    server_local_dir=os.path.join('PAUS', base_dir)\n",
    "    server_local_profile_dir=os.path.join(server_local_dir, 'profile')\n",
    "    server_hdfs_dir=f'/{base_dir}/'\n",
    "\n",
    "    !ssh {server} \"mkdir -p {server_local_profile_dir}\"\n",
    "    !ssh {server} \"cd {server_local_profile_dir} && rm {appid}.tar.gz >/dev/null 2>&1\"\n",
    "    !ssh {server} \"cd {server_local_profile_dir} && rm -r {appid} >/dev/null 2>&1\"\n",
    "    !scp {local_profile_dir}/{appid}.tar.gz {server}:{server_local_profile_dir}/\n",
    "    !ssh {server} \"cd {server_local_profile_dir} && tar zxf {appid}.tar.gz\"\n",
    "    !ssh {server} \"hdfs dfs -mkdir -p {server_hdfs_dir}\"\n",
    "    !ssh {server} \"hdfs dfs -rm -r {server_hdfs_dir}{appid} >/dev/null 2>&1\"\n",
    "    !ssh {server} \"hdfs dfs -put {server_local_profile_dir}/{appid} {server_hdfs_dir}\"\n",
    "    !ssh {server} \"cd {server_local_profile_dir}; rm {appid}.tar.gz; rm -r {appid}\"\n",
    "\n",
    "def killsar(clients):\n",
    "    for l in clients:\n",
    "        out=!ssh $l \"ps aux | grep -w sar | grep -v grep | tr -s ' ' | cut -d' ' -f2\"\n",
    "        for x in out:\n",
    "            !ssh $l \"kill $x > /dev/null 2>&1\"\n",
    "    for l in clients:\n",
    "        out=!ssh $l \"ps aux | grep -w pidstat | grep -v grep | tr -s ' ' | cut -d' ' -f2\"\n",
    "        for x in out:\n",
    "            !ssh $l \"kill $x > /dev/null 2>&1\"\n",
    "    for l in clients:\n",
    "        out=!ssh $l \"ps aux | grep -w perf | grep -v grep | tr -s ' ' | cut -d' ' -f2\"\n",
    "        for x in out:\n",
    "            !ssh root@$l \"kill $x > /dev/null 2>&1\"\n",
    "    for l in clients:\n",
    "        !ssh $l \"emon -stop > /dev/null 2>&1\"\n",
    "\n",
    "def killnumactl(clients):\n",
    "    for l in clients:\n",
    "        out =!ssh $l \"ps aux | grep numactl | grep bash | tr -s ' ' | cut -d' ' -f2\"\n",
    "        for x in out:\n",
    "            !ssh $l \"kill $x > /dev/null 2>&1\"\n",
    "\n",
    "def startmonitor(clients, appid, collect_emon, **kwargs):\n",
    "    local_profile_dir=os.path.join(home, 'profile')\n",
    "    prof=os.path.join(local_profile_dir, appid)\n",
    "    !mkdir -p {prof}\n",
    "    \n",
    "    for l in clients:\n",
    "        !ssh root@{l} date\n",
    "    \n",
    "    killsar(clients)\n",
    "    \n",
    "    if collect_emon:\n",
    "        !cp -f {emon_list} {home}/emon.list\n",
    "        for l in clients:\n",
    "            !scp {home}/emon.list {l}:{home}/emon.list  > /dev/null 2>&1\n",
    "    \n",
    "    perfsyscalls=kwargs.get(\"collect_perf_syscall\",None)\n",
    "    \n",
    "    for l in clients:\n",
    "        prof_client=os.path.join(prof, l)\n",
    "        !mkdir -p {prof_client}\n",
    "        !ssh {l} mkdir -p {prof_client}\n",
    "        !ssh {l} \"sar -o {prof_client}/sar.bin -r -u -d -B -n DEV 1 >/dev/null 2>&1 &\"\n",
    "        !ssh root@{l} \"jps | grep CoarseGrainedExecutorBackend | cut -d' ' -f 1 | xargs -I % bash -c '(cat /proc/%/status >> {prof_client}/%.stat; cat /proc/%/io >> {prof_client}/%.stat)'\"\n",
    "        if collect_emon:\n",
    "            !ssh {l} \"emon -i {home}/emon.list -f {prof_client}/emon.rst >/dev/null 2>&1 & \"\n",
    "        else:\n",
    "            !ssh root@{l} \"perf stat -e 'instructions,cycles,cpu_clk_unhalted.thread,cpu_clk_unhalted.ref_tsc' -a -I 500 -o {prof_client}/perfstat.txt  >/dev/null 2>&1 & \"\n",
    "            !ssh {l} \"cat /sys/devices/system/cpu/cpu0/tsc_freq_khz | xargs -I% echo %000 > {prof_client}/tsc_freq 2>/dev/null &\"\n",
    "            !ssh {l} \"lscpu | grep '^CPU(s):' | cut -d ':' -f 2 | tr -d ' ' > {prof_client}/totalcores 2>/dev/null &\"\n",
    "        if kwargs.get(\"collect_pid\",False):\n",
    "            !ssh {l} \"jps | grep CoarseGrainedExecutorBackend | head -n 1 | cut -d' ' -f 1 | xargs  -I % pidstat -h -t -p % 1  > {prof_client}/pidstat.out  2>/dev/null &\"\n",
    "        !ssh root@{l} 'cat /proc/uptime  | cut -d\" \" -f 1 | xargs -I ^ date -d \"- ^ seconds\"  +%s.%N' > $prof/$l/uptime.txt\n",
    "        if kwargs.get(\"collect_sched\",False):\n",
    "            !ssh root@{l} 'perf trace -e \"sched:sched_switch\" -C 8-15 -o {prof_client}/sched.txt -T -- sleep 10000 >/dev/null 2>/dev/null &'\n",
    "        if perfsyscalls is not None:\n",
    "            !ssh root@{l} \"perf stat -e 'syscalls:sys_exit_poll,syscalls:sys_exit_epoll_wait' -a -I 1000 -o {prof_client}/perfsyscalls.txt  >/dev/null 2>&1 & \"\n",
    "        if kwargs.get(\"collect_hbm\",False):\n",
    "            hbm_nodes = kwargs.get(\"hbm_nodes\")\n",
    "            if hbm_nodes is not None:\n",
    "                print(\"collect_hbm\")\n",
    "                hbm_nodes = '\\|'.join([\"node \" + str(i) for i in hbm_nodes])\n",
    "                %env hbm_numa_nodes={hbm_nodes}\n",
    "                %env hbm_l = {l}\n",
    "                %env hbm_prof = {prof}\n",
    "                !ssh $hbm_l \"echo timestamp, size, free > $hbm_prof/$hbm_l/numactl.csv\"\n",
    "                !ssh $hbm_l \"while :; do echo \\$(numactl -H | grep '$hbm_numa_nodes' | grep 'size' | awk '{ print \\$4 }' | awk '{ s += \\$1 } END { print s }'), \\$(numactl -H | grep '$hbm_numa_nodes' | grep 'free' | awk '{ print \\$4 }' | awk '{ s += \\$1 } END { print s }') | ts '%Y-%m-%d %H:%M:%S,' >> $hbm_prof/$hbm_l/numactl.csv; sleep 1; done >/dev/null 2>&1 &\"\n",
    "            else:\n",
    "                print(\"Missing argument: hbm_nodes. e.g. hbm_nodes = list(range(8,16))\")\n",
    "\n",
    "def stopmonitor(clients, sc, appid, result, collect_emon, **kwargs):\n",
    "    %cd ~\n",
    "    \n",
    "    local_profile_dir=os.path.join(home, 'profile')\n",
    "    prof=os.path.join(local_profile_dir, appid)\n",
    "    !mkdir -p {prof}\n",
    "\n",
    "    killsar(clients)\n",
    "    killnumactl(clients)\n",
    "    \n",
    "    for l in clients:\n",
    "        prof_client=os.path.join(prof, l)\n",
    "        !ssh {l} \"sar -f {prof_client}/sar.bin -r > {prof_client}/sar_mem.sar;sar -f {prof_client}/sar.bin -u > {prof_client}/sar_cpu.sar;sar -f {prof_client}/sar.bin -d -p > {prof_client}/sar_disk.sar;sar -f {prof_client}/sar.bin -n DEV > {prof_client}/sar_nic.sar;sar -f {prof_client}/sar.bin -B > {prof_client}/sar_page.sar;\" \n",
    "        !ssh root@{l} \"jps | grep CoarseGrainedExecutorBackend | cut -d' ' -f 1 | xargs -I % bash -c '(cat /proc/%/status >> {prof_client}/%.stat; cat /proc/%/io >> {prof_client}/%.stat)'\"\n",
    "        if collect_emon:\n",
    "            !ssh {l} \"source ~/sep_install/sep_vars.sh>/dev/null 2>&1; emon -v \" > {prof}/{l}/emonv.txt\n",
    "        !ssh {l} \"sar -V \" > {prof_client}/sarv.txt\n",
    "        !ssh {l} \"test -f {prof_client}/perfstat.txt && head -n 1 {prof_client}/perfstat.txt > {prof_client}/perfstarttime\"\n",
    "        if l!= socket.gethostname():\n",
    "            !scp -r {l}:{prof_client} {prof}/ > /dev/null 2>&1\n",
    "    \n",
    "    if sc is not None:\n",
    "        sc.stop()\n",
    "        \n",
    "    !git --git-dir=\"{gluten_home}/.git\" log --format=\"commit %H%nAuthor: %an <%ae>%nDate:  %cs%n %n %s %n\" --since=`date --date='2 days ago'  +'%m/%d/%Y'` > {prof}/changelog_gluten\n",
    "    !git --git-dir=\"{gluten_home}/ep/build-velox/build/velox_ep/.git\" log --format=\"commit %H%nAuthor: %an <%ae>%nDate:  %cs%n %n %s %n\" --since=`date --date='2 days ago'  +'%m/%d/%Y'` > {prof}/changelog_velox\n",
    "    \n",
    "    with open(f\"{prof}/starttime\",\"w\") as f:\n",
    "        f.write(\"{:d}\".format(int(time.time()*1000)))\n",
    "    \n",
    "    with open(f'{prof}/query_time.json', 'w') as f:\n",
    "        json.dump(result, f)\n",
    "\n",
    "    if hdfs_event_dir != '':\n",
    "        !hadoop fs -copyToLocal {hdfs_event_dir}/{appid} {prof}/app.log\n",
    "    elif local_event_dir != '':\n",
    "        !cp {local_event_dir}/{appid}  {prof}/app.log"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "def pinexecutor_numa(clients):\n",
    "    cpunum = !ssh {clients[0]} \"grep 'processor' /proc/cpuinfo | wc -l\"\n",
    "    cpunum = int(cpunum[0])\n",
    "        \n",
    "    numanodes=!ssh {clients[0]} \"cat /sys/devices/system/node/node*/cpulist\"\n",
    "    numanodes = list(filter(lambda x: x != '', numanodes))\n",
    "    print(numanodes)\n",
    "    for client in clients:\n",
    "        pids=!ssh {client} \"jps | grep CoarseGrainedExecutorBackend | cut -d' ' -f1\"\n",
    "        print(client,\":\",len(pids),\" \",\"\\t\".join(map(str,pids)))\n",
    "        \n",
    "        cpunum_c = !ssh {client} \"grep 'processor' /proc/cpuinfo | wc -l\"\n",
    "        cpunum_c = int(cpunum_c[0])\n",
    "        if cpunum_c != cpunum:\n",
    "            print(f\"client {client} cpunum not match!\")\n",
    "            return\n",
    "        numanodes_c=!ssh {client} \"cat /sys/devices/system/node/node*/cpulist\"\n",
    "        numanodes_c = list(filter(lambda x: x != '', numanodes))\n",
    "        time.sleep(1)\n",
    "        print(numanodes_c)\n",
    "        if numanodes_c != numanodes:\n",
    "            print(f\"client {client} numanodes not match!\")\n",
    "            return\n",
    "        \n",
    "        idx = 0\n",
    "        nodes=len(numanodes)\n",
    "        for i in range(nodes):\n",
    "            cpus = numanodes[i]\n",
    "            for l in pids[idx:idx+int(len(pids)/nodes)]: #  executors on 1 numanode\n",
    "                print(f\" {cpus} {l}\")\n",
    "                !ssh {client} \"taskset -a -p -c $cpus $l > /dev/null 2>&1 \"\n",
    "            idx += int(len(pids)/nodes)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "def config_pagecache(clients, run_gluten=True):\n",
    "    for l in clients:\n",
    "        if run_gluten:\n",
    "            !ssh root@$l \"echo 80 > /proc/sys/vm/dirty_ratio\"\n",
    "            !ssh root@$l \"echo 50 > /proc/sys/vm/dirty_background_ratio\"\n",
    "            !ssh root@$l \"echo 360000 > /proc/sys/vm/dirty_expire_centisecs\"\n",
    "            !ssh root@$l \"echo 3000 > /proc/sys/vm/dirty_writeback_centisecs\"\n",
    "\n",
    "        else:\n",
    "            !ssh root@$l \"echo 10 > /proc/sys/vm/dirty_ratio\"\n",
    "            !ssh root@$l \"echo 20 > /proc/sys/vm/dirty_background_ratio\"\n",
    "            !ssh root@$l \"echo 3000 > /proc/sys/vm/dirty_expire_centisecs\"\n",
    "            !ssh root@$l \"echo 500 > /proc/sys/vm/dirty_writeback_centisecs\""
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "def print_kernel_params(clietns):\n",
    "    params = {\n",
    "        'transparent hugepage': '/sys/kernel/mm/transparent_hugepage/enabled',\n",
    "        'auto numa balancing': '/proc/sys/kernel/numa_balancing',\n",
    "        'scaling governor': '/sys/devices/system/cpu/cpu*/cpufreq/scaling_governor',\n",
    "        'scaling max freq': '/sys/devices/system/cpu/cpu*/cpufreq/scaling_max_freq',\n",
    "        'scaling cur freq': '/sys/devices/system/cpu/cpu*/cpufreq/scaling_cur_freq',\n",
    "        'power & perf policy': '/sys/devices/system/cpu/cpu*/power/energy_perf_bias',\n",
    "        'dirty_ratio': '/proc/sys/vm/dirty_ratio',\n",
    "        'dirty_background_ratio': '/proc/sys/vm/dirty_background_ratio',\n",
    "        'dirty_expire_centisecs': '/proc/sys/vm/dirty_expire_centisecs',\n",
    "        'dirty_writeback_centisecs': '/proc/sys/vm/dirty_writeback_centisecs'\n",
    "    }\n",
    "    for k, param in params.items():\n",
    "        print()\n",
    "        print(f'{k} ({param})')\n",
    "        for l in clients:\n",
    "            print(l + \": \", end='')\n",
    "            res = !ssh root@$l \"cat {param}\"\n",
    "            print(*res)\n",
    "    # print numactl\n",
    "    print()\n",
    "    print(\"numactl -H\")\n",
    "    for l in clients:\n",
    "        print(l + \":\")\n",
    "        res = !ssh $l \"numactl -H\"\n",
    "        print('\\n'.join(res))\n",
    "    # print memory freq\n",
    "    print()\n",
    "    print(\"Memory Frequency\")\n",
    "    for l in clients:\n",
    "        print(l + \":\")\n",
    "        res= !ssh root@$l \"dmidecode -t memory | grep Speed\"\n",
    "        print('\\n'.join(res))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "code_folding": []
   },
   "outputs": [],
   "source": [
    "def dropcache(clients):\n",
    "    for l in clients:\n",
    "        !ssh root@$l \"sync && echo 3 > /proc/sys/vm/drop_caches; echo 1 >/proc/sys/vm/compact_memory; free -h\""
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "def config_mem_cgroup(clients):\n",
    "    mem_cgroup = \"\"\"\n",
    "CGROUP_ROOT=/sys/fs/cgroup/gluten\n",
    "\n",
    "if [ ! -d $CGROUP_ROOT ] ; then\n",
    "        sudo mkdir $CGROUP_ROOT\n",
    "        # enable memory for subtree\n",
    "        sudo bash -c \"echo '+memory' >> $CGROUP_ROOT/cgroup.subtree_control\"\n",
    "fi\n",
    "\n",
    "# move each process to sub memory group\n",
    "index=0\n",
    "for pid in `jps | grep Coarse | awk '{print $1}'` ; do\n",
    "        target_cgroup=$CGROUP_ROOT/mem-${index}\n",
    "        if [ ! -d $target_cgroup ] ; then\n",
    "                sudo mkdir $target_cgroup\n",
    "        fi\n",
    "        proc_file=$target_cgroup/cgroup.procs\n",
    "        sudo bash -c \"echo $pid >> $proc_file\"\n",
    "        index=`expr $index + 1`\n",
    "done\n",
    "    \"\"\"\n",
    "    with open(f'{home}/mem-cgroup.sh', 'w+') as f:\n",
    "        f.writelines(mem_cgroup)\n",
    "    for l in clients:\n",
    "        !scp {home}/mem-cgroup.sh {l}:{home}/ >/dev/null 2>&1\n",
    "        !ssh {l} \"bash {home}/mem-cgroup.sh >/dev/null 2>&1 &\""
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import pandas as pd\n",
    "import matplotlib.pyplot as plt\n",
    "import os\n",
    "\n",
    "\n",
    "from IPython.display import display, HTML\n",
    "\n",
    "def get_io_stats(appid,  client):\n",
    "    file_path = os.path.join(home,'profile',appid,client)\n",
    "    statf = [f for f in os.listdir(file_path) if f.endswith('.stat')]\n",
    "    statmap=[]\n",
    "    for f in statf:\n",
    "        statmap.append({'pid':f[:-len(\".stat\")]})\n",
    "        with open(os.path.join(file_path, f),\"r\") as fi:\n",
    "            cnts=fi.readlines()\n",
    "        for l in cnts:\n",
    "            for fld in ['rchar','wchar','syscr','syscw','read_bytes','write_bytes','cancelled_write_bytes']:\n",
    "                if l.startswith(fld):\n",
    "                    if not fld in statmap[-1]:\n",
    "                        statmap[-1][fld]=int(l.split(\" \")[-1].strip())\n",
    "                    else:\n",
    "                        statmap[-1][fld]=(int(l.split(\" \")[-1].strip())-statmap[-1][fld])/1024/1024/1024\n",
    "\n",
    "    df = pd.DataFrame(statmap).drop('pid', axis=1).sum().to_frame()\n",
    "    df.columns = ['sum']\n",
    "    return df\n",
    "\n",
    "# Preprocess 'time' column\n",
    "def process_time(dataframes):\n",
    "    for df in dataframes:\n",
    "        df.columns=['time']+list(df.columns[1:])\n",
    "        df = df[df.time != 'Average:']\n",
    "        df['time'] = pd.to_datetime(df['time'], format='%H:%M:%S').dt.time\n",
    "        df['time'] = df['time'].apply(lambda dt: dt.hour*3600 + dt.minute*60 + dt.second)\n",
    "\n",
    "        offset = 12 * 3600 # half-day seconds\n",
    "        for i in range(1, len(df)):\n",
    "            if df['time'].iloc[i] < df['time'].iloc[i-1]:  # Detect AM->PM or PM->AM\n",
    "                for j in range(i, len(df)): # Apply offset until end\n",
    "                    df['time'].iloc[j] += offset\n",
    "\n",
    "        df['time'] = df['time'].astype(int)\n",
    "        yield df\n",
    "\n",
    "def draw_sar(appid, qtime=None, disk_dev=None, nic_dev=None, client=None):\n",
    "    if client is None:\n",
    "        client = clients[0]\n",
    "\n",
    "    display(HTML('<font size=6pt color=red>{:s}</font>'.format(client)))\n",
    "\n",
    "    display(get_io_stats(appid, client))\n",
    "\n",
    "    # Read data\n",
    "    profile_dir = os.path.join(home,'profile',appid,client)\n",
    "    datafiles = [os.path.join(profile_dir, datafile) for datafile in ['sar_cpu.sar', 'sar_mem.sar', 'sar_disk.sar', 'sar_nic.sar', 'sar_page.sar']]\n",
    "    dataframes = [pd.read_csv(datafile, header=1, delim_whitespace=True, parse_dates=True) for datafile in datafiles]\n",
    "  \n",
    "    num_figs=5\n",
    "    fig, axs=plt.subplots(num_figs,1,sharex=True,figsize=(30,5*4))\n",
    "\n",
    "    [cpu_df, mem_df, disk_df, nic_df, page_df] = process_time(dataframes)\n",
    "\n",
    "    # CPU usage\n",
    "    cpu_df['total'] = cpu_df['%user'] + cpu_df['%system'] + cpu_df['%iowait']\n",
    "\n",
    "    starttime = cpu_df[cpu_df['total'] > 50]['time'].min() - 1\n",
    "    cpu_df['time'] -= starttime\n",
    "\n",
    "    axs[4].stackplot(cpu_df['time'], cpu_df['%user'], cpu_df['%system'], cpu_df['%iowait'], labels=['user','system','iowait'])\n",
    "    axs[4].legend(loc='upper left')\n",
    "\n",
    "    # Memory usage\n",
    "    mem_df['dirty_cached'] = mem_df['kbdirty'] * mem_df['%memused'] / mem_df['kbmemused']\n",
    "    mem_df['clean_cached'] = (mem_df['kbcached'] - mem_df['kbdirty']) * mem_df['%memused'] / mem_df['kbmemused']\n",
    "    mem_df['used'] = mem_df['kbmemused'] * mem_df['%memused'] / mem_df['kbmemused']\n",
    "#     mem_df['used'] = (mem_df['kbmemused'] - mem_df['kbbuffers'] - mem_df['kbcached'])* mem_df['%memused'] / mem_df['kbmemused']\n",
    "\n",
    "    mem_df['time'] -= starttime\n",
    "\n",
    "    axs[0].stackplot(mem_df['time'], mem_df['used'], mem_df['clean_cached'], mem_df['dirty_cached'], labels=['used','clean cached','dirty cached'])\n",
    "    axs[0].legend(loc='upper left')\n",
    "    axs[0].grid(axis = 'y')\n",
    "\n",
    "    # Disk usage\n",
    "    if disk_dev is not None:\n",
    "        disk_df = disk_df[disk_df['DEV'].isin(disk_dev)]\n",
    "    disk_df['rkB/s'] = disk_df['rkB/s'].astype(float)\n",
    "    disk_df['wkB/s'] = disk_df['wkB/s'].astype(float)\n",
    "    disk_df['%util'] = disk_df['%util'].astype(float)\n",
    "\n",
    "\n",
    "    disk_df = disk_df.groupby('time').agg({'rkB/s': 'sum', 'wkB/s': 'sum', '%util':'mean'}).reset_index()\n",
    "    disk_df['read'] = disk_df['rkB/s'] / 1024\n",
    "    disk_df['write'] = disk_df['wkB/s'] / 1024\n",
    "\n",
    "    disk_df['time'] -= starttime\n",
    "\n",
    "    axs[1].stackplot(disk_df['time'], disk_df['read'], disk_df['write'], labels=['read MB/s','write MB/s'])\n",
    "    axs[1].grid(axis = 'y')\n",
    "\n",
    "    ax2 = axs[1].twinx()\n",
    "\n",
    "    ax2.plot(disk_df['time'], disk_df['%util'],'g-')\n",
    "    axs[1].legend(loc='upper left')\n",
    "\n",
    "    \n",
    "    # Nic usage\n",
    "    if nic_dev is not None:\n",
    "        nic_df = nic_df[nic_df['IFACE'].isin(nic_dev)]\n",
    "    nic_df['rxkB/s'] = nic_df['rxkB/s'].astype(float)\n",
    "    nic_df['txkB/s'] = nic_df['txkB/s'].astype(float)\n",
    "    \n",
    "    nic_df = nic_df.groupby('time').agg({'rxkB/s': 'sum', 'txkB/s': \"sum\"}).reset_index()\n",
    "    nic_df['rx'] = nic_df['rxkB/s'] / 1024\n",
    "    nic_df['tx'] = nic_df['txkB/s'] / 1024\n",
    "    \n",
    "    nic_df['time'] -= starttime\n",
    "    \n",
    "    axs[2].stackplot(nic_df['time'], nic_df['rx'], nic_df['tx'], labels=['rx MB/s','tx MB/s'])\n",
    "    axs[2].legend(loc='upper left')\n",
    "    axs[2].grid(axis = 'y')\n",
    "\n",
    "    # Pagefaults\n",
    "    page_df['minflt/s'] = page_df['fault/s'] - page_df['majflt/s']\n",
    "    \n",
    "    page_df['time'] -= starttime\n",
    "\n",
    "    axs[3].stackplot(page_df['time'], page_df['minflt/s'], page_df['majflt/s'], labels=['minor_fault/s','major_fault/s'])\n",
    "    axs[3].legend(loc='upper left')\n",
    "    axs[3].grid(axis = 'y')\n",
    "\n",
    "    # Add vertical lines and text for qtime, and calculate per query cpu%\n",
    "    if qtime is not None:\n",
    "        for ax in axs:\n",
    "            x = 0\n",
    "            ax.axvline(x = x, color = 'b')\n",
    "            for k, v in qtime.items():\n",
    "                x += v\n",
    "                ax.axvline(x = x, color = 'b')\n",
    "\n",
    "            tx = 0\n",
    "            for k, v in qtime.items():\n",
    "                if v / x > 15 / 772:\n",
    "                    ax.text(tx + v / 2 - 6 * x / 772, ax.get_ylim()[1] * 1.05, k)\n",
    "                tx += v\n",
    "\n",
    "        x = 0\n",
    "        qtime_se = {}\n",
    "        cols = ['%user','%system','%iowait']\n",
    "        for k, v in qtime.items():\n",
    "            filtered_df = cpu_df[(cpu_df['time'] >= x) & (cpu_df['time'] <= x+v)]\n",
    "            averages = filtered_df[cols].mean()\n",
    "            qtime_se[k] = averages.tolist()\n",
    "            x += v\n",
    "        if qtime_se:\n",
    "            perqcpu = pd.DataFrame(qtime_se).T\n",
    "            perqcpu.columns = cols\n",
    "            display(perqcpu)\n",
    "\n",
    "    plt.show()\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "def convert_to_etc_gmt(tz_offset=None):\n",
    "    # Run the 'date +%z' command and get the output\n",
    "    if not tz_offset:\n",
    "        tz_offset = !date +%z\n",
    "        tz_offset = tz_offset[0]\n",
    "    \n",
    "    # Extract the sign and the hour/minute offset\n",
    "    sign = tz_offset[0]\n",
    "    hours = int(tz_offset[1:3])\n",
    "    minutes = int(tz_offset[3:])\n",
    "\n",
    "    # Convert the offset to a GMT value\n",
    "    gmt_offset = hours + (minutes / 60)\n",
    "    if sign == '+':\n",
    "        gmt_offset = -gmt_offset\n",
    "    else:\n",
    "        gmt_offset = abs(gmt_offset)\n",
    "\n",
    "    # Construct the Etc/GMT string\n",
    "    etc_gmt = f\"Etc/GMT{int(gmt_offset):+d}\"\n",
    "    return etc_gmt"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "def get_last_run(records_file, appid=''):\n",
    "    if os.path.exists(records_file):\n",
    "        if appid:\n",
    "            lines=!tail -n100 $records_file\n",
    "            if len(lines) > 1:\n",
    "                # Check appid match\n",
    "                last_appid = lines[-1].split('\\t')[1]\n",
    "                if last_appid != appid:\n",
    "                    print(f'appid not match. Required {appid}. Got {last_appid}')\n",
    "                else:\n",
    "                    for line in lines[:-1][::-1]:\n",
    "                        l=line.split('\\t')\n",
    "                        if 'main' in l[3]:\n",
    "                            return l[1],l[2],l[3]\n",
    "        else:\n",
    "            lines=!tail -n1 $records_file\n",
    "            if len(lines) == 1:\n",
    "                l=lines[0].split('\\t')\n",
    "                return l[1],l[2],l[3]\n",
    "    return None, None, None"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# TestTPC"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "code_folding": []
   },
   "outputs": [],
   "source": [
    "import os\n",
    "import socket\n",
    "from dataclasses import dataclass\n",
    "from functools import wraps\n",
    "from pathlib import Path\n",
    "from typing import List \n",
    "\n",
    "class TestTPC:\n",
    "    @dataclass\n",
    "    class query_info:\n",
    "        tables: List[str]\n",
    "        sql: List[str]\n",
    "\n",
    "    query_infos = {}\n",
    "    query_ids =[]\n",
    "\n",
    "    tpctables=[]\n",
    "    tpc_query_path = ''\n",
    "    \n",
    "    RECORDS_SPARK_TPCH = f\"records_spark_tpch_spark{spark_version_short}.csv\"\n",
    "    RECORDS_SPARK_TPCDS = f\"records_spark_tpcds_spark{spark_version_short}.csv\"\n",
    "    RECORDS_GLUTEN_TPCH = f\"records_gluten_tpch_spark{spark_version_short}.csv\"\n",
    "    RECORDS_GLUTEN_TPCDS = f\"records_gluten_tpcds_spark{spark_version_short}.csv\"\n",
    "    \n",
    "    def __init__(self, spark, table_dir, run_gluten, workload, server, base_dir, nb_name, data_source = 'parquet'):\n",
    "        self.spark = spark\n",
    "        self.sc = spark.sparkSession.sparkContext\n",
    "        self.appid = self.sc.applicationId\n",
    "        self.app_name = '_'.join(self.sc.appName.split(' '))\n",
    "        self.run_gluten = run_gluten\n",
    "        self.workload = workload\n",
    "        self.table_dir = table_dir\n",
    "        self.server = server\n",
    "        self.base_dir = base_dir\n",
    "        self.nb_name = nb_name\n",
    "        self.data_source = data_source\n",
    "        self.table_loaded = False\n",
    "        self.result = {}\n",
    "        self.duration = 0\n",
    "        self.stopped = False\n",
    "        self.collect_emon = False\n",
    "        self.perf_html = ''\n",
    "        self.finished_nb = ''\n",
    "        for l in os.listdir(self.tpc_query_path):\n",
    "            if (l[-3:] == 'sql'):\n",
    "                with open(self.tpc_query_path+l,\"r\") as f:\n",
    "                    self.query_infos[l.split(\".\")[0]]=self.query_info(self.tpctables,[\"\\n\".join(f.readlines())])\n",
    "        self.query_ids = sorted(self.query_infos.keys(), key=lambda x: str(len(x))+x if x[-1] != 'a' and x[-1] != 'b' else str(len(x)-1) + x)\n",
    "        print(\"http://{}:18080/history/{}/jobs/\".format(local_ip, self.sc.applicationId))\n",
    "    \n",
    "    def start_monitor(self, clients, emon_list='', **kw):\n",
    "        if emon_list:\n",
    "            self.collect_emon = True\n",
    "        startmonitor(clients, self.appid, self.collect_emon, **kw)\n",
    "    \n",
    "    def stop_monitor(self, clients, **kw):\n",
    "        if self.stopped:\n",
    "            return\n",
    "        stopmonitor(clients, self.sc, self.appid, self.result, self.collect_emon, **kw)\n",
    "\n",
    "        output_nb = f'{self.nb_name[:-6]}-{self.appid}.ipynb'\n",
    "        \n",
    "        record_file = ''\n",
    "        if self.workload == 'tpch':\n",
    "            if self.run_gluten:\n",
    "                record_file = self.RECORDS_GLUTEN_TPCH\n",
    "            else:\n",
    "                record_file = self.RECORDS_SPARK_TPCH\n",
    "        else:\n",
    "            if self.run_gluten:\n",
    "                record_file = self.RECORDS_GLUTEN_TPCDS\n",
    "            else:\n",
    "                record_file = self.RECORDS_SPARK_TPCDS\n",
    "        record_file = os.path.join(cwd, record_file)\n",
    "        with open(record_file, 'a+') as f:\n",
    "            f.write(f'{datetime.now()}\\t{self.appid}\\t{self.base_dir}\\t{self.app_name}\\t{output_nb}\\t{self.duration}\\n')\n",
    "\n",
    "        if self.server:\n",
    "            if output_nb.startswith(cwd):\n",
    "                output_nb = os.path.relpath(output_nb, cwd)\n",
    "                self.finished_nb = f\"http://{localhost}:8888/tree/{output_nb}\"\n",
    "            upload_profile(self.server, self.base_dir, self.appid)\n",
    "        \n",
    "        self.stopped = True\n",
    "\n",
    "    def run_perf_analysis(self, server_gluten_home, disk_dev, nic_dev, proxy='', emails=[], pr=''):\n",
    "        if not self.server:\n",
    "            return\n",
    "\n",
    "        run_script=f'{server_gluten_home}/tools/workload/benchmark_velox/analysis/run-perf-analysis.sh'\n",
    "\n",
    "        disk=','.join(disk_dev)\n",
    "        nic=','.join(nic_dev)\n",
    "\n",
    "        command =' '.join(['bash', run_script, '--base-dir', self.base_dir, '--name', self.app_name, '--appid', self.appid, '--disk', disk, '--nic', nic, '--tz', convert_to_etc_gmt(), '--proxy', proxy if proxy != '' else \"''\", '--emails', ','.join(emails) if emails else \"''\", '--pr', pr if pr != '' and pr.isdigit() else \"''\"])\n",
    "        \n",
    "        if self.run_gluten:\n",
    "            if self.workload == 'tpch':\n",
    "                comp_file = os.path.join(cwd, self.RECORDS_GLUTEN_TPCH)\n",
    "                baseline_file = os.path.join(cwd, self.RECORDS_SPARK_TPCH)\n",
    "            else:\n",
    "                comp_file = os.path.join(cwd, self.RECORDS_GLUTEN_TPCDS)\n",
    "                baseline_file = os.path.join(cwd, self.RECORDS_SPARK_TPCDS)\n",
    "            comp_appid, comp_base_dir, comp_name = get_last_run(comp_file, self.appid)\n",
    "            if comp_appid:\n",
    "                command += ' '.join(['', '--comp-appid', comp_appid, '--comp-base-dir', comp_base_dir, '--comp-name', comp_name])\n",
    "            baseline_appid, baseline_base_dir, _ = get_last_run(baseline_file, '')\n",
    "            if baseline_appid:\n",
    "                command += ' '.join(['', '--baseline-appid', baseline_appid, '--baseline-base-dir', baseline_base_dir])\n",
    "        print(command)\n",
    "\n",
    "        # Block if running on local cluster.\n",
    "        if self.server == localhost:\n",
    "            !ssh {self.server} \"{command} > /dev/null 2>&1\"\n",
    "        else:\n",
    "            !ssh {self.server} \"{command} > /dev/null 2>&1 &\"\n",
    "\n",
    "        self.perf_html=f'http://{self.server}:8889/view/{self.base_dir}/html/{self.app_name}_{self.appid}.html'\n",
    "        display(HTML(f'<a href=\"{self.perf_html}\">{self.perf_html}</a>'))\n",
    "        \n",
    "    def load_table(self, table):\n",
    "        if type(self.table_dir)==list:\n",
    "            return self.spark.read.format(self.data_source).load([os.path.join(t, table) for t in self.table_dir])\n",
    "        else:\n",
    "            return self.spark.read.format(self.data_source).load(os.path.join(self.table_dir, table))\n",
    "    \n",
    "    def load_tables_as_tempview(self, tables):\n",
    "        for table in tables:\n",
    "            df = self.load_table(table)\n",
    "            df.createOrReplaceTempView(table)\n",
    "        \n",
    "    def load_all_tables_as_tempview(self):\n",
    "        print(f\"Loading all tables: {self.tpctables}\")\n",
    "        self.load_tables_as_tempview(self.tpctables)\n",
    "    \n",
    "    def load_query(self, query):\n",
    "        info = self.query_infos[query]\n",
    "        return [self.spark.sql(q) for q in info.sql]\n",
    "    \n",
    "    def run_query(self, query, explain = False, print_result=False, load_table=True):\n",
    "        if load_table:\n",
    "            self.load_all_tables_as_tempview()\n",
    "        start_time = timeit.default_timer()\n",
    "        print(\"start query \" + query + \", application id \" + self.sc.applicationId)\n",
    "        print(\"{} : {}\".format(\"Start time\", start_time))\n",
    "        self.sc.setJobDescription(query)\n",
    "\n",
    "        queries = self.load_query(query)\n",
    "        for q in queries:\n",
    "            if explain: q.explain()\n",
    "            collect=q.collect()\n",
    "        end_time = timeit.default_timer()\n",
    "        duration = end_time - start_time\n",
    "        display(HTML(('Completed Query. Time(sec): <font size=6pt color=red>{:f}</font>'.format(duration))))\n",
    "        \n",
    "        self.result[query] = duration\n",
    "        self.duration += float(duration)\n",
    "        if print_result:\n",
    "            print(collect)\n",
    "\n",
    "    def power_run(self, explain=False, print_result=False, load_table=True):\n",
    "        if load_table:\n",
    "            self.load_all_tables_as_tempview()\n",
    "        for l in self.query_ids:\n",
    "            self.run_query(l, explain=explain, print_result=print_result, load_table=False)\n",
    "\n",
    "    def print_result(self):\n",
    "        print(self.result)\n",
    "        print()\n",
    "        print(f\"total duration:\\n{self.duration}\\n\")\n",
    "        if self.server:\n",
    "            print(self.finished_nb)\n",
    "            print(f\"http://{self.server}:1088/tracing_examples/trace_viewer.html#/tracing/test_data/{self.appid}.json\")\n",
    "            print(f\"http://{self.server}:18080/history/{self.appid}\")\n",
    "            print(self.perf_html)\n",
    "        print(self.appid)\n",
    "        for t in self.result.values():\n",
    "            print(t)\n",
    "    \n",
    "class TestTPCH(TestTPC):\n",
    "    tpctables = ['customer', 'lineitem', 'nation', 'orders', 'part', 'partsupp', 'region', 'supplier']\n",
    "    tpc_query_path = f'{gluten_home}/tools/gluten-it/common/src/main/resources/tpch-queries/'\n",
    "        \n",
    "    def __init__(self, spark, table_dir, run_gluten, server, base_dir, nb_name, data_source = 'parquet'):\n",
    "        TestTPC.__init__(self,spark, table_dir, run_gluten, 'tpch', server, base_dir, nb_name, data_source)\n",
    "                \n",
    "class TestTPCDS(TestTPC):\n",
    "    tpctables = [ 'call_center',\n",
    "         'catalog_page',\n",
    "         'catalog_returns',\n",
    "         'catalog_sales',\n",
    "         'customer',\n",
    "         'customer_address',\n",
    "         'customer_demographics',\n",
    "         'date_dim',\n",
    "         'household_demographics',\n",
    "         'income_band',\n",
    "         'inventory',\n",
    "         'item',\n",
    "         'promotion',\n",
    "         'reason',\n",
    "         'ship_mode',\n",
    "         'store',\n",
    "         'store_returns',\n",
    "         'store_sales',\n",
    "         'time_dim',\n",
    "         'warehouse',\n",
    "         'web_page',\n",
    "         'web_returns',\n",
    "         'web_sales',\n",
    "         'web_site']\n",
    "    tpc_query_path = f'{gluten_home}/tools/gluten-it/common/src/main/resources/tpcds-queries/'\n",
    "    \n",
    "    def __init__(self, spark, table_dir, run_gluten, server, base_dir, nb_name, data_source = 'parquet'):\n",
    "        TestTPC.__init__(self,spark, table_dir, run_gluten, 'tpcds', server, base_dir, nb_name, data_source)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Create SparkContext"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## default config"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "code_folding": []
   },
   "outputs": [],
   "source": [
    "import os\n",
    "\n",
    "def findjemalloc():\n",
    "    l = clients[0]\n",
    "    jemallocDir = !ssh $l \"whereis libjemalloc.so.2\"\n",
    "    libjemalloc = jemallocDir[0].split(' ')\n",
    "    return libjemalloc[1]\n",
    "\n",
    "def get_py4jzip():\n",
    "    spark_home=os.environ['SPARK_HOME']\n",
    "    py4jzip = !ls {spark_home}/python/lib/py4j*.zip\n",
    "    return py4jzip[0]\n",
    "\n",
    "def default_conf(executors_per_node, cores_per_executor, task_per_core, memory_per_node, extra_jars='', app_name='', master='yarn', run_gluten=False):\n",
    "    # Create a temp directory that gets cleaned up on exit\n",
    "    output_dir = os.path.abspath(tempfile.mkdtemp())\n",
    "    def cleanup():\n",
    "        shutil.rmtree(output_dir, True)\n",
    "    atexit.register(cleanup)\n",
    "    signal.signal(signal.SIGTERM, cleanup)\n",
    "\n",
    "##################################################\n",
    "    def convert_to_bytes(size):\n",
    "        units = {'k': 1, 'm': 2, 'g': 3}\n",
    "        size = size.lower()\n",
    "        if size[-1] in units:\n",
    "            return int(size[:-1]) * 1024 ** units[size[-1]]\n",
    "        else:\n",
    "            return int(size)\n",
    "\n",
    "    def yarn_padding(size):\n",
    "        min_size =  convert_to_bytes('1g')\n",
    "        step = min_size\n",
    "        while size > min_size:\n",
    "            min_size += step\n",
    "        return min_size - size\n",
    "    \n",
    "    num_nodes = len(clients)\n",
    "    num_executors = num_nodes*executors_per_node\n",
    "    parallelism = num_executors*cores_per_executor*task_per_core\n",
    "\n",
    "    if run_gluten:\n",
    "        offheap_ratio = gluten_offheap_ratio\n",
    "    else:\n",
    "        offheap_ratio = spark_offheap_ratio\n",
    "    driver_memory = convert_to_bytes('20g')\n",
    "    executor_memory_overhead = convert_to_bytes('1g')\n",
    "    \n",
    "    # Minimun executor memory\n",
    "    min_memory = convert_to_bytes('1g')\n",
    "\n",
    "    # Calculate executor onheap memory\n",
    "    num_driver = 1 if localhost in clients else 0\n",
    "    executor_memory = math.floor((convert_to_bytes(memory_per_node) - (executor_memory_overhead + min_memory)*executors_per_node - (driver_memory + min_memory)*num_driver)/(offheap_ratio*num_driver + (1+offheap_ratio)*executors_per_node))\n",
    "    executor_memory = max(executor_memory, min_memory)\n",
    "    # Calculate driver/executor offheap memory in MB\n",
    "    #offheap_memory_per_node = convert_to_bytes(memory_per_node) - (executor_memory + executor_memory_overhead) * executors_per_node\n",
    "    if offheap_ratio > 0:\n",
    "        enable_offheap = True\n",
    "        offheap_memory = math.floor(executor_memory*offheap_ratio)\n",
    "    else:\n",
    "        enable_offheap = False\n",
    "        offheap_memory = 0\n",
    "\n",
    "    byte_to_mb = lambda x: int(x/(1024 ** 2))\n",
    "    driver_memory_mb = byte_to_mb(driver_memory)\n",
    "    executor_memory_overhead_mb = byte_to_mb(executor_memory_overhead)\n",
    "    executor_memory_mb = byte_to_mb(executor_memory)\n",
    "    offheap_memory_mb = byte_to_mb(offheap_memory)\n",
    "    \n",
    "    executor_totalmem_mb = executor_memory_overhead_mb + executor_memory_mb + offheap_memory_mb\n",
    "    executor_totalmem_mb = yarn_padding(executor_totalmem_mb)\n",
    "    if byte_to_mb(convert_to_bytes(memory_per_node)) - executor_totalmem_mb*executors_per_node > executor_totalmem_mb:\n",
    "        executor_memory_overhead_mb += 1024\n",
    "    \n",
    "    print('''\n",
    "        executors per node: {:d}\n",
    "        parallelism: {:d}\n",
    "        executor memory: {:d}m\n",
    "        offheap memory: {:d}m\n",
    "    '''.format(executors_per_node, parallelism, executor_memory_mb, offheap_memory_mb))\n",
    "\n",
    "    conf = SparkConf() \\\n",
    "        .set('spark.app.name', app_name)\\\n",
    "        .set('spark.master',master)\\\n",
    "        .set('spark.executor.memory', '{:d}m'.format(executor_memory_mb))\\\n",
    "        .set('spark.memory.offHeap.enabled', enable_offheap)\\\n",
    "        .set('spark.memory.offHeap.size','{:d}m'.format(offheap_memory_mb))\\\n",
    "        .set('spark.sql.shuffle.partitions', parallelism)\\\n",
    "        .set('spark.executor.instances', '{:d}'.format(num_executors))\\\n",
    "        .set('spark.executor.cores','{:d}'.format(cores_per_executor))\\\n",
    "        .set('spark.task.cpus','{:d}'.format(1))\\\n",
    "        .set('spark.driver.memory', '{:d}m'.format(driver_memory_mb))\\\n",
    "        .set('spark.executor.memoryOverhead', '{:d}m'.format(executor_memory_overhead_mb))\\\n",
    "        .set('spark.driver.maxResultSize', '4g')\\\n",
    "        .set('spark.executor.extraJavaOptions',\\\n",
    "            f'-XX:+UseParallelOldGC -XX:ParallelGCThreads=2 -XX:NewRatio=1 -XX:SurvivorRatio=1 -XX:+UseCompressedOops -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:ErrorFile={home}/logs/java/hs_err_pid%p.log')\\\n",
    "        .set('spark.driver.extraClassPath', extra_jars) \\\n",
    "        .set('spark.executor.extraClassPath', extra_jars) \\\n",
    "        .set('spark.executorEnv.PYTHONPATH',f\"{os.environ['SPARK_HOME']}python:{get_py4jzip()}\") \\\n",
    "        .set(\"spark.repl.class.outputDir\", output_dir) \\\n",
    "        .set(\"spark.sql.broadcastTimeout\", \"4800\") \\\n",
    "        .set('spark.serializer','org.apache.spark.serializer.KryoSerializer')\\\n",
    "        .set('spark.kryoserializer.buffer.max','512m')\\\n",
    "        .set('spark.kryo.unsafe',False)\\\n",
    "        .set('spark.sql.adaptive.enabled',True)\\\n",
    "        .set('spark.sql.autoBroadcastJoinThreshold',\"10m\")\\\n",
    "        .set('spark.sql.catalogImplementation','hive')\\\n",
    "        .set('spark.sql.optimizer.dynamicPartitionPruning.enabled',True)\\\n",
    "        .set('spark.cleaner.periodicGC.interval', '10s')\n",
    "\n",
    "    return conf\n",
    "\n",
    "\n",
    "def create_cntx_with_config(conf,conf_overwrite=None):\n",
    "\n",
    "    importlib.reload(pyspark.java_gateway)\n",
    "\n",
    "    def Popen(*args, **kwargs):\n",
    "        \"\"\"Wraps subprocess.Popen to force stdout and stderr from the child process\n",
    "        to pipe to this process without buffering.\n",
    "        \"\"\"\n",
    "        global spark_jvm_proc\n",
    "        # Override these in kwargs to avoid duplicate value errors\n",
    "        # Set streams to unbuffered so that we read whatever bytes are available\n",
    "        # when ready, https://docs.python.org/3.6/library/subprocess.html#popen-constructor\n",
    "        kwargs['bufsize'] = 0\n",
    "        # Capture everything from stdout for display in the notebook\n",
    "        kwargs['stdout'] = subprocess.PIPE\n",
    "        print(\"java proc gateway popen\")\n",
    "        spark_jvm_proc = subprocess.Popen(*args, **kwargs)\n",
    "        return spark_jvm_proc\n",
    "    pyspark.java_gateway.Popen = Popen\n",
    "\n",
    "    spylon_kernel.scala_interpreter.scala_intp=None\n",
    "    \n",
    "    if conf_overwrite is not None:\n",
    "        conf=conf_overwrite(conf)\n",
    "    print(\"spark.serializer: \",conf.get(\"spark.serializer\"))\n",
    "    print(\"master: \",conf.get(\"spark.master\"))\n",
    "    \n",
    "    sc = SparkContext(conf = conf,master=conf.get(\"spark.master\"))\n",
    "    sc.setLogLevel('ERROR')\n",
    "    \n",
    "    sc.addPyFile(f\"{os.environ['SPARK_HOME']}/python/lib/pyspark.zip\")\n",
    "    sc.addPyFile(get_py4jzip())\n",
    "    \n",
    "    spark = SQLContext(sc)\n",
    "    \n",
    "    time.sleep(30)\n",
    "    \n",
    "    for client in clients:\n",
    "        pids=!ssh $client \"jps | grep CoarseGrainedExecutorBackend | cut -d' ' -f1\"\n",
    "        print(client,\":\",len(pids),\" \",\"\\t\".join(map(str,pids)))\n",
    "        \n",
    "    spark_session = SparkSession(sc)\n",
    "    spark_jvm_helpers = SparkJVMHelpers(spark_session._sc)\n",
    "    spylon_kernel.scala_interpreter.spark_state = spylon_kernel.scala_interpreter.SparkState(spark_session, spark_jvm_helpers, spark_jvm_proc)\n",
    "    \n",
    "    print(\"appid: \",sc.applicationId)\n",
    "    print(\"SparkConf:\")\n",
    "\n",
    "    df = pd.DataFrame(sc.getConf().getAll(), columns=['key', 'value'])\n",
    "    display(df)\n",
    "\n",
    "    return sc, spark"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Spark"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "def spark_tpch_conf_overwrite(conf):\n",
    "    return conf\n",
    "\n",
    "def spark_tpcds_conf_overwrite(conf):\n",
    "    conf.set('spark.sql.optimizer.runtime.bloomFilter.applicationSideScanSizeThreshold', '0')\\\n",
    "        .set('spark.sql.optimizer.runtime.bloomFilter.enabled', 'true')\n",
    "    return conf"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "def create_cntx_spark(executors_per_node, cores_per_executor, task_per_core, memory_per_node, extra_jars, app_name='', master='yarn', conf_overwrite=None):\n",
    "    conf = default_conf(executors_per_node, cores_per_executor, task_per_core, memory_per_node, extra_jars, app_name, master, run_gluten=False)\n",
    "    conf.set(\"spark.sql.execution.arrow.maxRecordsPerBatch\",20480)\\\n",
    "        .set(\"spark.sql.parquet.columnarReaderBatchSize\",20480)\\\n",
    "        .set(\"spark.sql.inMemoryColumnarStorage.batchSize\",20480)\n",
    "    return create_cntx_with_config(conf,conf_overwrite)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Gluten"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "def gluten_tpch_conf_overwrite(conf):\n",
    "    return conf\n",
    "\n",
    "def gluten_tpcds_conf_overwrite(conf):\n",
    "    conf.set('spark.sql.optimizer.runtime.bloomFilter.applicationSideScanSizeThreshold', '0')\\\n",
    "        .set('spark.sql.optimizer.runtime.bloomFilter.enabled', 'true')\\\n",
    "        .set('spark.gluten.sql.columnar.joinOptimizationLevel', '18')\\\n",
    "        .set('spark.gluten.sql.columnar.physicalJoinOptimizeEnable', 'true')\\\n",
    "        .set('spark.gluten.sql.columnar.physicalJoinOptimizationLevel', '18')\\\n",
    "    return conf"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "def create_cntx_gluten(executors_per_node, cores_per_executor, task_per_core, memory_per_node, extra_jars, app_name='', master='yarn', conf_overwrite=None):\n",
    "    conf = default_conf(executors_per_node, cores_per_executor, task_per_core, memory_per_node, extra_jars, app_name, master, run_gluten=True)\n",
    "    conf.set('spark.sql.files.maxPartitionBytes', '4g')\\\n",
    "        .set('spark.plugins','org.apache.gluten.GlutenPlugin')\\\n",
    "        .set('spark.shuffle.manager','org.apache.spark.shuffle.sort.ColumnarShuffleManager')\\\n",
    "        .set('spark.gluten.sql.columnar.backend.lib','velox')\\\n",
    "        .set('spark.gluten.sql.columnar.maxBatchSize',4096)\\\n",
    "        .set('spark.gluten.sql.columnar.forceShuffledHashJoin',True)\\\n",
    "        .set('spark.executorEnv.LD_PRELOAD', findjemalloc())\\\n",
    "        .set('spark.gluten.sql.columnar.coalesce.batches', 'true')\n",
    "    \n",
    "    return create_cntx_with_config(conf,conf_overwrite)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Create Context"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "def create_cntx(run_gluten=False, workload='tpch', app_conf_overwrite=None, server='', base_dir='', nb_name='tpc_workload.ipynb', app_name=''):\n",
    "    table_dir=''\n",
    "    extra_jars = ''\n",
    "    is_tpch_workload=False\n",
    "    is_tpcds_workload=False\n",
    "    app_name_suffix=''\n",
    "    workload_conf_overwrite=None\n",
    "    create_cntx_func=None\n",
    "    test_tpc=None\n",
    "\n",
    "    if workload.lower() == 'tpch':\n",
    "        app_name_suffix = f\"tpch_spark{spark_version_short}\"\n",
    "        tabledir = tpch_tabledir\n",
    "        is_tpch_workload=True\n",
    "    elif workload.lower() == 'tpcds':\n",
    "        app_name_suffix = f\"tpcds_spark{spark_version_short}\"\n",
    "        tabledir = tpcds_tabledir\n",
    "        is_tpcds_workload=True\n",
    "    else:\n",
    "        raise ValueError(f\"Unknown workload: {workload}\")\n",
    "\n",
    "    lastgit=!git --git-dir {gluten_home}/.git log --format=\"%H\" -n 1\n",
    "    lastgit = lastgit[0]\n",
    "    print(f'lastgit: {lastgit}')\n",
    "\n",
    "    nodes=len(clients)\n",
    "\n",
    "    if run_gluten:\n",
    "        jars_base=f\"{home}/jars/\"+lastgit\n",
    "        \n",
    "        for target_jar in gluten_target_jar.split(\",\"):\n",
    "            !ls -l {target_jar}\n",
    "            !mkdir -p {jars_base}\n",
    "            !rm -rf {jars_base}/*\n",
    "            !cp {target_jar} {jars_base}/\n",
    "            if target_jar[-4:] != '.jar':\n",
    "                !cp -f {target_jar} {jars_base}/gluten-{lastgit}.jar\n",
    "\n",
    "        jars=!ls -d {jars_base}/*.jar\n",
    "        extra_jars=\":\".join([\"file://\"+j for j in jars])\n",
    "        print(f'extra_jars: {extra_jars}')\n",
    "\n",
    "        for c in clients:\n",
    "            if c!=localhost:\n",
    "                !ssh {c} \"rm -rf {jars_base}\"\n",
    "                !ssh {c} \"mkdir -p {jars_base}\"\n",
    "                !scp {jars_base}/*.jar {c}:{jars_base} >/dev/null 2>&1\n",
    "\n",
    "        app_name_suffix = '_'.join(['gluten', app_name_suffix, lastgit[:6]])\n",
    "        create_cntx_func=create_cntx_gluten\n",
    "        if is_tpch_workload:\n",
    "            task_per_core = gluten_tpch_task_per_core\n",
    "            workload_conf_overwrite = gluten_tpch_conf_overwrite\n",
    "        elif is_tpcds_workload:\n",
    "            task_per_core = gluten_tpcds_task_per_core\n",
    "            workload_conf_overwrite = gluten_tpcds_conf_overwrite\n",
    "    else:\n",
    "        app_name_suffix = '_'.join(['spark', app_name_suffix, lastgit[:6]])\n",
    "        create_cntx_func=create_cntx_spark\n",
    "        if is_tpch_workload:\n",
    "            task_per_core = spark_tpch_task_per_core\n",
    "            workload_conf_overwrite = spark_tpch_conf_overwrite\n",
    "        elif is_tpcds_workload:\n",
    "            task_per_core = spark_tpcds_task_per_core\n",
    "            workload_conf_overwrite = spark_tpcds_conf_overwrite\n",
    "    \n",
    "    if app_name:\n",
    "        app_name = app_name + ' ' + app_name_suffix\n",
    "    else:\n",
    "        app_name = app_name_suffix\n",
    "\n",
    "    conf_overwrite = lambda conf: app_conf_overwrite(workload_conf_overwrite(conf))\n",
    "    \n",
    "    sc, spark = create_cntx_func(executors_per_node, cores_per_executor, task_per_core, memory_per_node, extra_jars, app_name, master, conf_overwrite)\n",
    "    \n",
    "    # Pin executors to numa nodes for Gluten\n",
    "    if run_gluten:\n",
    "        pinexecutor_numa(clients)\n",
    "\n",
    "    appid = sc.applicationId\n",
    "    print(\"start run: \", appid)\n",
    "    \n",
    "    if is_tpch_workload:\n",
    "        test_tpc = TestTPCH(spark, tabledir, run_gluten, server, base_dir, nb_name)\n",
    "    elif is_tpcds_workload:\n",
    "        test_tpc = TestTPCDS(spark, tabledir, run_gluten, server, base_dir, nb_name)\n",
    "    \n",
    "    return sc, spark, appid, test_tpc"
   ]
  }
 ],
 "metadata": {
  "hide_input": false,
  "kernelspec": {
   "display_name": "Python 3 (ipykernel)",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.10.12"
  },
  "nbTranslate": {
   "displayLangs": [
    "*"
   ],
   "hotkey": "alt-t",
   "langInMainMenu": true,
   "sourceLang": "en",
   "targetLang": "fr",
   "useGoogleTranslate": true
  },
  "toc": {
   "base_numbering": 1,
   "nav_menu": {},
   "number_sections": true,
   "sideBar": false,
   "skip_h1_title": false,
   "title_cell": "Table of Contents",
   "title_sidebar": "Contents",
   "toc_cell": false,
   "toc_position": {
    "height": "364.469px",
    "left": "2086.8px",
    "top": "150.516px",
    "width": "375px"
   },
   "toc_section_display": true,
   "toc_window_display": true
  },
  "toc-autonumbering": true,
  "varInspector": {
   "cols": {
    "lenName": 16,
    "lenType": 16,
    "lenVar": 40
   },
   "kernels_config": {
    "python": {
     "delete_cmd_postfix": "",
     "delete_cmd_prefix": "del ",
     "library": "var_list.py",
     "varRefreshCmd": "print(var_dic_list())"
    },
    "r": {
     "delete_cmd_postfix": ") ",
     "delete_cmd_prefix": "rm(",
     "library": "var_list.r",
     "varRefreshCmd": "cat(var_dic_list()) "
    }
   },
   "types_to_exclude": [
    "module",
    "function",
    "builtin_function_or_method",
    "instance",
    "_Feature"
   ],
   "window_display": false
  }
 },
 "nbformat": 4,
 "nbformat_minor": 4
}
